{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "28d8fbc7",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | default_exp _components._subprocess"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7fcc07a5",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "import asyncio\n",
    "import platform\n",
    "import signal\n",
    "from typing import *\n",
    "from types import FrameType\n",
    "\n",
    "import asyncer\n",
    "import typer\n",
    "\n",
    "from fastkafka._components.logger import get_logger"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "170ddd1c",
   "metadata": {},
   "outputs": [],
   "source": [
    "import sys\n",
    "import os\n",
    "import platform\n",
    "from time import sleep\n",
    "\n",
    "from fastkafka._components.logger import suppress_timestamps"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "62aaa890",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | notest\n",
    "\n",
    "# allows async calls in notebooks\n",
    "\n",
    "import nest_asyncio"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "55be7819",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | notest\n",
    "\n",
    "nest_asyncio.apply()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "de03595b",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "logger = get_logger(__name__)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b9ee8c91",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[INFO] __main__: ok\n"
     ]
    }
   ],
   "source": [
    "suppress_timestamps()\n",
    "logger = get_logger(__name__, level=20)\n",
    "logger.info(\"ok\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e44f49ae",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "async def terminate_asyncio_process(p: asyncio.subprocess.Process) -> None:\n",
    "    \"\"\"\n",
    "    Terminates an asyncio process.\n",
    "\n",
    "    Args:\n",
    "        p: The asyncio.subprocess.Process instance.\n",
    "\n",
    "    Returns:\n",
    "        None.\n",
    "    \"\"\"\n",
    "    logger.info(f\"terminate_asyncio_process(): Terminating the process {p.pid}...\")\n",
    "    # Check if SIGINT already propagated to process\n",
    "    try:\n",
    "        await asyncio.wait_for(p.wait(), 1)\n",
    "        logger.info(\n",
    "            f\"terminate_asyncio_process(): Process {p.pid} was already terminated.\"\n",
    "        )\n",
    "        return\n",
    "    except asyncio.TimeoutError:\n",
    "        pass\n",
    "\n",
    "    for i in range(3):\n",
    "        if platform.system() == \"Windows\":\n",
    "            import psutil\n",
    "\n",
    "            try:\n",
    "                parent = psutil.Process(p.pid)\n",
    "                children = parent.children(recursive=True)\n",
    "                for child in children:\n",
    "                    child.kill()\n",
    "                p.send_signal(signal.CTRL_BREAK_EVENT)  # type: ignore\n",
    "            except psutil.NoSuchProcess:\n",
    "                pass\n",
    "        else:\n",
    "            p.terminate()\n",
    "        try:\n",
    "            await asyncio.wait_for(p.wait(), 10)\n",
    "            logger.info(f\"terminate_asyncio_process(): Process {p.pid} terminated.\")\n",
    "            return\n",
    "        except asyncio.TimeoutError:\n",
    "            logger.warning(\n",
    "                f\"terminate_asyncio_process(): Process {p.pid} not terminated, retrying...\"\n",
    "            )\n",
    "\n",
    "    logger.warning(f\"Killing the process {p.pid}...\")\n",
    "    p.kill()\n",
    "    await p.wait()\n",
    "    logger.warning(f\"terminate_asyncio_process(): Process {p.pid} killed!\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a5b0fe1a",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[INFO] __main__: terminate_asyncio_process(): Terminating the process 743...\n",
      "[INFO] __main__: terminate_asyncio_process(): Process 743 terminated.\n",
      "\u001b[?1l\u001b>?47h\u001b[1;24r\u001b[m\u001b[4l\u001b[H\u001b[2JEvery 0.1s: date\u001b[1;34Hdavor-fastkafka-devel: Tue Feb  7 15:05:41 2023\u001b[3;1HTue Feb  7 15:05:41 UTC 2023\u001b[24;80H\u001b[1;75H2\u001b[3;19H2\u001b[24;80H\u001b[1;75H3\u001b[3;19H3\u001b[24;80H\u001b[1;75H4\u001b[3;19H4\u001b[24;80H\u001b[1;75H5\u001b[3;19H5\u001b[24;80H\u001b[24;1H\u001b[2J\u001b[?47l\u001b8\n"
     ]
    }
   ],
   "source": [
    "if platform.system() == \"Windows\":\n",
    "    code = 'import datetime; print(datetime.datetime.now())'\n",
    "    proc = await asyncio.create_subprocess_exec(\n",
    "        sys.executable, '-c', code,\n",
    "        stdout=asyncio.subprocess.PIPE)\n",
    "else:\n",
    "    proc = await asyncio.create_subprocess_exec(\n",
    "        \"watch\", \"-n\", \"0.1\", \"date\", stdout=asyncio.subprocess.PIPE\n",
    "    )\n",
    "sleep(3)\n",
    "await terminate_asyncio_process(proc)\n",
    "outputs, _ = await proc.communicate()\n",
    "\n",
    "print(outputs.decode(\"utf-8\"))\n",
    "\n",
    "assert proc.returncode == 0, f\"{command} returns {proc.returncode=}, {proc.stderr=}\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "52e1c583",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "async def run_async_subprocesses(\n",
    "    commands: List[str], commands_args: List[List[Any]], *, sleep_between: int = 0\n",
    ") -> None:\n",
    "    \"\"\"\n",
    "    Runs multiple async subprocesses.\n",
    "\n",
    "    Args:\n",
    "        commands: A list of commands to execute.\n",
    "        commands_args: A list of argument lists for each command.\n",
    "        sleep_between: The sleep duration in seconds between starting each subprocess.\n",
    "\n",
    "    Returns:\n",
    "        None.\n",
    "    \"\"\"\n",
    "    loop = asyncio.get_event_loop()\n",
    "\n",
    "    HANDLED_SIGNALS = (\n",
    "        signal.SIGINT,  # Unix signal 2. Sent by Ctrl+C.\n",
    "        signal.SIGTERM,  # Unix signal 15. Sent by `kill <pid>`.\n",
    "    )\n",
    "    if platform.system() == \"Windows\":\n",
    "        HANDLED_SIGNALS = (*HANDLED_SIGNALS, signal.SIGBREAK)  # type: ignore\n",
    "\n",
    "    d = {\"should_exit\": False}\n",
    "\n",
    "    def handle_windows_exit(\n",
    "        signum: int, frame: Optional[FrameType], d: Dict[str, bool] = d\n",
    "    ) -> None:\n",
    "        d[\"should_exit\"] = True\n",
    "\n",
    "    def handle_exit(sig: int, d: Dict[str, bool] = d) -> None:\n",
    "        d[\"should_exit\"] = True\n",
    "\n",
    "    for sig in HANDLED_SIGNALS:\n",
    "        if platform.system() == \"Windows\":\n",
    "            signal.signal(sig, handle_windows_exit)\n",
    "        else:\n",
    "            loop.add_signal_handler(sig, handle_exit, sig)\n",
    "\n",
    "    async with asyncer.create_task_group() as tg:\n",
    "        tasks = []\n",
    "        for cmd, args in zip(commands, commands_args):\n",
    "            tasks.append(\n",
    "                tg.soonify(asyncio.create_subprocess_exec)(\n",
    "                    cmd,\n",
    "                    *args,\n",
    "                    stdout=asyncio.subprocess.PIPE,\n",
    "                    stdin=asyncio.subprocess.PIPE,\n",
    "                )\n",
    "            )\n",
    "            await asyncio.sleep(sleep_between)\n",
    "\n",
    "    procs = [task.value for task in tasks]\n",
    "\n",
    "    async def log_output(\n",
    "        output: Optional[asyncio.StreamReader], pid: int, d: Dict[str, bool] = d\n",
    "    ) -> None:\n",
    "        if output is None:\n",
    "            raise RuntimeError(\"Expected StreamReader, got None. Is stdout piped?\")\n",
    "        while not output.at_eof():\n",
    "            outs = await output.readline()\n",
    "            if outs != b\"\":\n",
    "                typer.echo(f\"[{pid:03d}]: \" + outs.decode(\"utf-8\"), nl=False)\n",
    "\n",
    "    async with asyncer.create_task_group() as tg:\n",
    "        for proc in procs:\n",
    "            tg.soonify(log_output)(proc.stdout, proc.pid)\n",
    "\n",
    "        while not d[\"should_exit\"]:\n",
    "            await asyncio.sleep(0.2)\n",
    "\n",
    "        typer.echo(\"Starting process cleanup, this may take a few seconds...\")\n",
    "        for proc in procs:\n",
    "            tg.soonify(terminate_asyncio_process)(proc)\n",
    "\n",
    "    for proc in procs:\n",
    "        output, _ = await proc.communicate()\n",
    "        if output:\n",
    "            typer.echo(f\"[{proc.pid:03d}]: \" + output.decode(\"utf-8\"), nl=False)\n",
    "\n",
    "    returncodes = [proc.returncode for proc in procs]\n",
    "    if not returncodes == [0] * len(procs):\n",
    "        typer.secho(\n",
    "            f\"Return codes are not all zero: {returncodes}\",\n",
    "            err=True,\n",
    "            fg=typer.colors.RED,\n",
    "        )\n",
    "        raise typer.Exit(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c92cf406",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | notest\n",
    "\n",
    "# async with asyncer.create_task_group() as tg:\n",
    "#     tg.soonify(run_async_subprocesses)([\"watch\"]*4, [[\"-n\", \"0.1\", \"date\"]]*4, sleep_between=1)\n",
    "#     await asyncio.sleep(3)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "804d0df0",
   "metadata": {},
   "outputs": [],
   "source": [
    "# # | export\n",
    "\n",
    "\n",
    "# @contextmanager\n",
    "# def run_in_process(\n",
    "#     target: Callable[..., Any]\n",
    "# ) -> Generator[multiprocessing.Process, None, None]:\n",
    "#     p = multiprocessing.Process(target=target)\n",
    "#     try:\n",
    "#         p.start()\n",
    "#         yield p\n",
    "#     except Exception as e:\n",
    "#         print(f\"Exception raised {e=}\")\n",
    "#     finally:\n",
    "#         p.terminate()\n",
    "#         p.join()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "fe7f93c3",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "python3",
   "language": "python",
   "name": "python3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
